from collections import namedtuple, defaultdict
from ortools.sat.python import cp_model
import plotly.express as px
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# Create Jobs Data Using Random Numbers for Processing Time
def create_jobs_example(jobs, machines, max_duration=20, seed=23):
np.random.seed(seed)
rnd = (1+np.random.randint(max_duration, size=(jobs, machines))).tolist()
init = np.random.randint(3*max_duration, size=machines).tolist()
jobs_data = [[(i,t) for i,t in zip(np.random.permutation(len(row)).tolist(),row)] for row in rnd]
return jobs_data, init
# Small Jobs Data Example
jobs_data = [ # task = (machine_id, processing_time).
[(0, 3), (1, 2), (2, 2)], # Job0
[(0, 2), (2, 1), (1, 4)], # Job1
[(1, 4), (2, 3)], # Job2
[(0, 6), (1, 1), (2, 1)] # Job3
]
jobs_data, machine_init = create_jobs_example(12, 9, 6)
machines_count = 1 + max(task[0] for job in jobs_data for task in job)
all_machines = range(machines_count)
# Computes horizon dynamically as the sum of all durations.
horizon = sum(task[1] for job in jobs_data for task in job)
model = cp_model.CpModel()
# Named tuple to store information about created variables.
task_type = namedtuple('task_type', 'start end interval')
# Named tuple to manipulate solution information.
assigned_task_type = namedtuple('assigned_task_type',
'start job index duration')
# Creates job intervals and add to the corresponding machine lists.
all_tasks = {}
machine_to_intervals = defaultdict(list)
for job_id, job in enumerate(jobs_data):
for task_id, task in enumerate(job):
machine = task[0]
duration = task[1]
suffix = '_%i_%i' % (job_id, task_id)
start_var = model.NewIntVar(0, horizon, 'start' + suffix)
end_var = model.NewIntVar(0, horizon, 'end' + suffix)
interval_var = model.NewIntervalVar(start_var, duration, end_var,
'interval' + suffix)
all_tasks[job_id, task_id] = task_type(start=start_var,
end=end_var,
interval=interval_var)
machine_to_intervals[machine].append(interval_var)
# Create and add disjunctive constraints.
for machine in all_machines:
model.AddNoOverlap(machine_to_intervals[machine])
# Precedences inside a job.
for job_id, job in enumerate(jobs_data):
for task_id in range(len(job) - 1):
model.Add(
all_tasks[job_id, task_id + 1].start >= all_tasks[job_id, task_id].end
)
# Task 0 Begins After Machine is Freed
for machine in all_machines:
for interval_var in machine_to_intervals[machine]:
model.Add(interval_var.StartExpr() >= machine_init[machine])
# Makespan objective.
obj_var = model.NewIntVar(0, horizon, 'makespan')
model.AddMaxEquality(obj_var, [
all_tasks[job_id, len(job) - 1].end
for job_id, job in enumerate(jobs_data)
])
model.Minimize(obj_var)
begin = datetime.now()
solver = cp_model.CpSolver()
# Sets a time limit of 10 seconds.
solver.parameters.max_time_in_seconds = 100.0
status = solver.Solve(model)
print(datetime.now()-begin)
0:00:00.310662
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
print('Solution Status:', solver.StatusName())
# Create one list of assigned tasks per machine.
assigned_jobs = defaultdict(list)
for job_id, job in enumerate(jobs_data):
for task_id, task in enumerate(job):
machine = task[0]
assigned_jobs[machine].append(
assigned_task_type(start=solver.Value(
all_tasks[job_id, task_id].start),
job=job_id,
index=task_id,
duration=task[1]))
# Create per machine output lines.
datetime_0 = datetime.strptime("2023-04-01", "%Y-%m-%d")
tograph = [
{"Job":"WIP_M{:02d}".format(m), "Machine": "Machine_{:02d}".format(m),
"Start":datetime_0, "Finish":datetime_0 + timedelta(days=t)}
for m,t in enumerate(machine_init)
]
output = ''
for machine in all_machines:
# Sort by starting time.
assigned_jobs[machine].sort()
sol_line_tasks = 'Machine ' + str(machine) + ': '
sol_line = ' '
current_machine = "Machine_{:02d}".format(machine)
for assigned_task in assigned_jobs[machine]:
tograph.append({'Job':"Job_{:02d}".format(assigned_task.job)})
name = 'job_%i_task_%i' % (assigned_task.job,
assigned_task.index)
tograph[-1]["Machine"] = current_machine
# Add spaces to output to align columns.
sol_line_tasks += '%-15s' % name
start = assigned_task.start
duration = assigned_task.duration
sol_tmp = '[%i,%i]' % (start, start + duration)
tograph[-1]["Start"] = datetime_0 + timedelta(days=start)
tograph[-1]["Finish"] = datetime_0 + timedelta(days=start + duration)
# Add spaces to output to align columns.
sol_line += '%-15s' % sol_tmp
sol_line += '\n'
sol_line_tasks += '\n'
output += sol_line_tasks
output += sol_line
# Finally print the solution found.
print(f'Optimal Schedule Length: {solver.ObjectiveValue()}')
print(f'Best Objective Bound : {solver.BestObjectiveBound()}')
#print(output)
else:
print('No solution found.')
Solution Status: OPTIMAL Optimal Schedule Length: 69.0 Best Objective Bound : 69.0
fig = px.timeline(
pd.DataFrame(tograph),
x_start="Start",
x_end="Finish",
y="Job",
color="Machine"
)
fig.update_yaxes(categoryorder="category descending")
fig.show()
print("MACHINE IDLE TIME\n")
for m in all_machines:
print("Machine_{:02d}".format(m), end='')
a,b,c = zip(*[[solver.Value(iv.StartExpr()), iv.SizeExpr(), solver.Value(iv.EndExpr())]
for iv in machine_to_intervals[m]])
t_tot = max(c)
t_work = sum(b) + machine_init[m]
t_idle = t_tot - t_work
print(" Time Available:{} Time Used:{:3d} Time Idle:{:3d} Idle Pct:{:6.1%}".format(
t_tot, t_work, t_idle, t_idle/t_tot))
MACHINE IDLE TIME Machine_00 Time Available:65 Time Used: 64 Time Idle: 1 Idle Pct: 1.5% Machine_01 Time Available:68 Time Used: 53 Time Idle: 15 Idle Pct: 22.1% Machine_02 Time Available:69 Time Used: 44 Time Idle: 25 Idle Pct: 36.2% Machine_03 Time Available:68 Time Used: 59 Time Idle: 9 Idle Pct: 13.2% Machine_04 Time Available:63 Time Used: 59 Time Idle: 4 Idle Pct: 6.3% Machine_05 Time Available:63 Time Used: 41 Time Idle: 22 Idle Pct: 34.9% Machine_06 Time Available:69 Time Used: 47 Time Idle: 22 Idle Pct: 31.9% Machine_07 Time Available:65 Time Used: 40 Time Idle: 25 Idle Pct: 38.5% Machine_08 Time Available:69 Time Used: 61 Time Idle: 8 Idle Pct: 11.6%
print("JOB RESPONSE TIME\n")
response = [0 for _ in jobs_data]
for m in all_machines:
for aj in assigned_jobs[m]:
response[aj.job] = max(response[aj.job], aj.start + aj.duration)
for j, t in enumerate(response):
print("Job {:2d} Response Time = {:3d}".format(j,t))
print("\nAverage Response Time = {:.1f}".format(np.mean(response)))
JOB RESPONSE TIME Job 0 Response Time = 44 Job 1 Response Time = 66 Job 2 Response Time = 69 Job 3 Response Time = 65 Job 4 Response Time = 69 Job 5 Response Time = 63 Job 6 Response Time = 68 Job 7 Response Time = 68 Job 8 Response Time = 62 Job 9 Response Time = 56 Job 10 Response Time = 69 Job 11 Response Time = 67 Average Response Time = 63.8